DATASCI W261: Machine Learning at Scale

Version 1: One MapReduce Stage (join data at the first reducer)

Data Generation

Data Information:

  • Sizes: 1000 points
  • True model: y = 1.0 * x - 4
  • Noise:Normal Distributed mean = 0, var = 5

In [2]:
%matplotlib inline
import numpy as np
import pylab 
size = 1000
x = np.random.uniform(-40, 40, size)
y = x * 1.0 - 4 + np.random.normal(0,5,size)
data = zip(range(size),y,x)
#data = np.concatenate((y, x), axis=1)
np.savetxt('LinearRegression.csv',data,'%i,%f,%f')

In [8]:
data[:10]


Out[8]:
[(0, 23.40928888427073, 32.917736564723114),
 (1, -30.253929548099201, -22.528345789897692),
 (2, 27.355626210091412, 28.440955855415808),
 (3, 21.808265765521085, 28.651135064922983),
 (4, -23.811553286406987, -24.563903354053032),
 (5, 19.985720829182519, 29.155013044605937),
 (6, -8.4248008896541489, -12.196508190731727),
 (7, 25.875857677304069, 26.31072814104013),
 (8, -45.575642601474954, -32.493931947624404),
 (9, 12.484475640241072, 14.902657917174345)]

Data Visualiazation


In [3]:
pylab.plot(x, y,'*')
pylab.show()


MrJob class code

The solution of linear model $$ \textbf{Y} = \textbf{X}\theta $$ is: $$ \hat{\theta} = (\textbf{X}^T\textbf{X})^{-1}\textbf{X}^T\textbf{y} $$ If $\textbf{X}^T\textbf{X}$ is denoted by $A$, and $\textbf{X}^T\textbf{y}$ is denoted by $b$, then $$ \hat{\theta} = A^{-1}b $$ There are two MrJob classes to calculate intermediate results:

  • linearRegressionXSquare.py calculates $A = \textbf{X}^T\textbf{X}$
  • linearRegressionXy.py calculates $b = \textbf{X}^T\textbf{y}$

In [4]:
%%writefile linearRegressionXSquare.py
#Version 1: One MapReduce Stage (join data at the first reducer)
from mrjob.job import MRJob

class MRMatrixX2(MRJob):
    #Emit all the data need to caculate cell i,j in result matrix
    def mapper(self, _, line):
        v = line.split(',')
        # add 1s to calculate intercept
        v.append('1.0')
        for i in range(len(v)-2):
            for j in range(len(v)-2):
                yield (j,i),(int(v[0]),float(v[i+2]))
                yield (i,j),(int(v[0]),float(v[i+2]))
                
    # Sum up the product for cell i,j
    def reducer(self, key, values):
        idxdict = {}
        s = 0.0
        preidx = -1
        preval = 0
        f = []
        for idx, value in values:
            if str(idx) in idxdict:
                s = s + value * idxdict[str(idx)]
            else:
                idxdict[str(idx)] = value
        yield key,s

if __name__ == '__main__':
    MRMatrixX2.run()


Writing linearRegressionXSquare.py

In [5]:
%%writefile linearRegressionXy.py
from mrjob.job import MRJob

class MRMatrixXY(MRJob):
    def mapper(self, _, line):
        v = line.split(',')
        # product of y*xi
        for i in range(len(v)-2):
            yield i, float(v[1])*float(v[i+2])
        # To calculate Intercept
        yield i+1, float(v[1])
    
    # Sum up the products
    def reducer(self, key, values):
        yield key,sum(values)

if __name__ == '__main__':
    MRMatrixXY.run()


Writing linearRegressionXy.py

Driver:

Driver run tow MrJob class to get $\textbf{X}^T\textbf{X}$ and $\textbf{X}^T\textbf{y}$. And it calculate $(\textbf{X}^T\textbf{X})^{-1}$ by numpy.linalg.solve.


In [12]:
from numpy import linalg,array,empty
from linearRegressionXSquare import MRMatrixX2
from linearRegressionXy import MRMatrixXY
mr_job1 = MRMatrixX2(args=['LinearRegression.csv'])
mr_job2 = MRMatrixXY(args=['LinearRegression.csv'])

X_Square = []
X_Y = []
# Calculate XT*X Covariance Matrix
print "Matrix XT*X:"
with mr_job1.make_runner() as runner: 
    # Run MrJob MatrixMultiplication Job
    runner.run()
    # Extract the output I.E. ship data to driver be careful if data you ship is too big
    for line in runner.stream_output():
        key,value =  mr_job1.parse_output_line(line)
        X_Square.append((key,value))
        print key, value
print " " 
# Calculate XT*Y
print "Vector XT*Y:"
with mr_job2.make_runner() as runner: 
    runner.run()
    for line in runner.stream_output():
        key,value =  mr_job2.parse_output_line(line)
        X_Y.append((key,value))
        print key, value
print " "    

#Local Processing the output from two MrJob
n = len(X_Y)
if(n*n!=len(X_Square)):
    print 'Error!'
else:
    XX = empty(shape=[n,n])
    for v in X_Square:
        XX[v[0][0],v[0][1]] = v[1]
    XY = empty(shape=[n,1])
    for v in X_Y:
        XY[v[0],0] = v[1]

print XX
print
print XY
        
theta = linalg.solve(XX,XY)
print "Coefficients:",theta[0,0],',',theta[1,0]


Matrix XT*X:
[0, 0] 524534.732204
[0, 1] -580.443195
[1, 0] -580.443195
[1, 1] 1000.0
 
Vector XT*Y:
0 526932.951368
1 -4304.993382
 
[[ 524534.7322035    -580.443195 ]
 [   -580.443195     1000.       ]]

[[ 526932.95136843]
 [  -4304.993382  ]]
Coefficients: 1.00045084002 , -3.72428849998

In [ ]:

Gradient descent - doesn't work


In [13]:
%%writefile MrJobBatchGDUpdate_LinearRegression.py
from mrjob.job import MRJob

# This MrJob calculates the gradient of the entire training set 
#     Mapper: calculate partial gradient for each example  
#     
class MrJobBatchGDUpdate_LinearRegression(MRJob):
    # run before the mapper processes any input
    def read_weightsfile(self):
        # Read weights file
        with open('weights.txt', 'r') as f:
            self.weights = [float(v) for v in f.readline().split(',')]
        # Initialze gradient for this iteration
        self.partial_Gradient = [0]*len(self.weights)
        self.partial_count = 0
    
    # Calculate partial gradient for each example 
    def partial_gradient(self, _, line):
        D = (map(float,line.split(',')))
        # y_hat is the predicted value given current weights
        y_hat = self.weights[0]+self.weights[1]*D[1]
        # Update parial gradient vector with gradient form current example
        self.partial_Gradient =  [self.partial_Gradient[0]+ D[0]-y_hat, self.partial_Gradient[1]+(D[0]-y_hat)*D[1]]
        self.partial_count = self.partial_count + 1
        #yield None, (D[0]-y_hat,(D[0]-y_hat)*D[1],1)
    
    # Finally emit in-memory partial gradient and partial count
    def partial_gradient_emit(self):
        yield None, (self.partial_Gradient,self.partial_count)
        
    # Accumulate partial gradient from mapper and emit total gradient 
    # Output: key = None, Value = gradient vector
    def gradient_accumulater(self, _, partial_Gradient_Record): 
        total_gradient = [0]*2
        total_count = 0
        for partial_Gradient,partial_count in partial_Gradient_Record:
            total_count = total_count + partial_count
            total_gradient[0] = total_gradient[0] + partial_Gradient[0]
            total_gradient[1] = total_gradient[1] + partial_Gradient[1]
        yield None, [v/total_count for v in total_gradient]
    
    def steps(self):
        return [self.mr(mapper_init=self.read_weightsfile,
                       mapper=self.partial_gradient,
                       mapper_final=self.partial_gradient_emit,
                       reducer=self.gradient_accumulater)] 
    
if __name__ == '__main__':
    MrJobBatchGDUpdate_LinearRegression.run()


Writing MrJobBatchGDUpdate_LinearRegression.py

In [20]:
from numpy import random, array
from MrJobBatchGDUpdate_LinearRegression import MrJobBatchGDUpdate_LinearRegression

learning_rate = 0.05
stop_criteria = 0.000005

# Generate random values as inital weights
weights = array([random.uniform(-3,3),random.uniform(-3,3)])

# Write the weights to the files
with open('weights.txt', 'w+') as f:
    f.writelines(','.join(str(j) for j in weights))


# Update centroids iteratively
i = 0
while(1):
    # create a mrjob instance for batch gradient descent update over all data
    mr_job = MrJobBatchGDUpdate_LinearRegression(args=['--file', 'weights.txt', 'LinearRegression.csv'])
    
    print "iteration ="+str(i)+"  weights =",weights
    # Save weights from previous iteration
    weights_old = weights
    with mr_job.make_runner() as runner: 
        runner.run()
        # stream_output: get access of the output 
        for line in runner.stream_output():
            # value is the gradient value
            key,value =  mr_job.parse_output_line(line)
            # Update weights
            weights = weights - learning_rate*array(value)
    i = i + 1
    if i>100: break
    # Write the updated weights to file 
    with open('weights.txt', 'w+') as f:
        f.writelines(','.join(str(j) for j in weights))
    # Stop if weights get converged
    if(sum((weights_old-weights)**2)<stop_criteria):
        break
        
print "Final weights\n"
print weights


iteration =0  weights = [ 0.19038759 -2.42536914]
iteration =1  weights = [-24.25303313  35.50722028]
iteration =2  weights = [  -58.0836022   1155.67503173]
iteration =3  weights = [  -334.72145048  34086.6844126 ]
iteration =4  weights = [   -7713.58006353  1002047.25717058]
iteration =5  weights = [  -223814.57459511  29453721.0403005 ]
iteration =6  weights = [ -6.57493399e+06   8.65745611e+08]
iteration =7  weights = [ -1.93255162e+08   2.54472210e+10]
iteration =8  weights = [ -5.68042385e+09   7.47980754e+11]
iteration =9  weights = [ -1.66967055e+11   2.19857095e+13]
iteration =10  weights = [ -4.90773210e+12   6.46235105e+14]
iteration =11  weights = [ -1.44255011e+14   1.89950573e+16]
iteration =12  weights = [ -4.24014755e+15   5.58329620e+17]
iteration =13  weights = [ -1.24632421e+17   1.64112148e+19]
iteration =14  weights = [ -3.66337260e+18   4.82381664e+20]
iteration =15  weights = [ -1.07679035e+20   1.41788449e+22]
iteration =16  weights = [ -3.16505466e+21   4.16764687e+23]
iteration =17  weights = [ -9.30317684e+22   1.22501378e+25]
iteration =18  weights = [ -2.73452147e+24   3.60073397e+26]
iteration =19  weights = [ -8.03769272e+25   1.05837872e+28]
iteration =20  weights = [ -2.36255246e+27   3.11093659e+29]
iteration =21  weights = [ -6.94434872e+28   9.14410533e+30]
iteration =22  weights = [ -2.04118131e+30   2.68776492e+32]
iteration =23  weights = [ -5.99972913e+31   7.90025924e+33]
iteration =24  weights = [ -1.76352534e+33   2.32215606e+35]
iteration =25  weights = [ -5.18360341e+34   6.82560992e+36]
iteration =26  weights = [ -1.52363811e+36   2.00627992e+38]
iteration =27  weights = [ -4.47849289e+37   5.89714203e+39]
iteration =28  weights = [ -1.31638205e+39   1.73337149e+41]
iteration =29  weights = [ -3.86929651e+40   5.09497093e+42]
iteration =30  weights = [ -1.13731842e+42   1.49758600e+44]
iteration =31  weights = [ -3.34296734e+43   4.40191683e+45]
iteration =32  weights = [ -9.82612299e+44   1.29387373e+47]
iteration =33  weights = [ -2.88823321e+46   3.80313687e+48]
iteration =34  weights = [ -8.48950402e+47   1.11787184e+50]
iteration =35  weights = [ -2.49535523e+49   3.28580721e+51]
iteration =36  weights = [ -7.33470144e+50   9.65810982e+52]
iteration =37  weights = [ -2.15591931e+52   2.83884840e+54]
iteration =38  weights = [ -6.33698331e+53   8.34434519e+55]
iteration =39  weights = [ -1.86265587e+55   2.45268810e+57]
iteration =40  weights = [ -5.47498187e+56   7.20928815e+58]
iteration =41  weights = [ -1.60928420e+58   2.11905606e+60]
iteration =42  weights = [ -4.73023599e+59   6.22862962e+61]
iteration =43  weights = [ -1.39037794e+61   1.83080702e+63]
iteration =44  weights = [ -4.08679573e+62   5.38136723e+64]
iteration =45  weights = [ -1.20124887e+64   1.58176766e+66]
iteration =46  weights = [ -3.53088078e+65   4.64935547e+67]
iteration =47  weights = [ -1.03784648e+67   1.36660440e+69]
iteration =48  weights = [ -3.05058532e+68   4.01691715e+70]
iteration =49  weights = [ -8.96671234e+69   1.18070917e+72]
iteration =50  weights = [ -2.63562306e+71   3.47050758e+73]
iteration =51  weights = [ -7.74699650e+72   1.02010073e+75]
iteration =52  weights = [ -2.27710691e+74   2.99842451e+76]
iteration =53  weights = [ -6.69319507e+75   8.81339391e+77]
iteration =54  weights = [ -1.96735867e+77   2.59055754e+79]
iteration =55  weights = [ -5.78273918e+78   7.61453353e+80]
iteration =56  weights = [ -1.69974458e+80   2.23817152e+82]
iteration =57  weights = [ -4.99612997e+81   6.57875065e+83]
iteration =58  weights = [ -1.46853327e+83   1.93371955e+85]
iteration =59  weights = [ -4.31652092e+84   5.68386231e+86]
iteration =60  weights = [ -1.26877295e+86   1.67068129e+88]
iteration =61  weights = [ -3.72935712e+87   4.91070303e+89]
iteration =62  weights = [ -1.09618545e+89   1.44342337e+91]
iteration =63  weights = [ -3.22206350e+90   4.24271436e+92]
iteration =64  weights = [ -9.47074528e+91   1.24707868e+94]
iteration =65  weights = [ -2.78377556e+93   3.66559025e+95]
iteration =66  weights = [ -8.18246731e+94   1.07744219e+97]
iteration =67  weights = [ -2.40510666e+96   3.16697065e+98]
iteration =68  weights = [ -7.06943005e+97   9.30880859e+99]
iteration =69  weights = [ -2.07794698e+099   2.73617683e+101]
iteration =70  weights = [ -6.10779601e+100   8.04255839e+102]
iteration =71  weights = [ -1.79528989e+102   2.36398265e+104]
iteration =72  weights = [ -5.27697026e+103   6.94855253e+105]
iteration =73  weights = [ -1.55108182e+105   2.04241695e+107]
iteration =74  weights = [ -4.55915932e+106   6.00336111e+108]
iteration =75  weights = [ -1.34009267e+108   1.76459291e+110]
iteration =76  weights = [ -3.93899012e+109   5.18674134e+111]
iteration =77  weights = [ -1.15780375e+111   1.52456046e+113]
iteration =78  weights = [ -3.40318074e+112   4.48120398e+114]
iteration =79  weights = [ -1.00031107e+114   1.31717893e+116]
iteration =80  weights = [ -2.94025595e+115   3.87163882e+117]
iteration =81  weights = [ -8.64241662e+116   1.13800690e+119]
iteration =82  weights = [ -2.54030147e+118   3.34499103e+120]
iteration =83  weights = [ -7.46681379e+119   9.83207130e+121]
iteration =84  weights = [ -2.19475164e+121   2.88998162e+123]
iteration =85  weights = [ -6.45112479e+122   8.49464320e+124]
iteration =86  weights = [ -1.89620595e+124   2.49686581e+126]
iteration =87  weights = [ -5.57359703e+125   7.33914156e+127]
iteration =88  weights = [ -1.63827056e+127   2.15722441e+129]
iteration =89  weights = [ -4.81543681e+128   6.34081945e+130]
iteration =90  weights = [ -1.41542137e+130   1.86378344e+132]
iteration =91  weights = [ -4.16040693e+131   5.47829619e+133]
iteration =92  weights = [ -1.22288572e+133   1.61025839e+135]
iteration =93  weights = [ -3.59447886e+134   4.73309947e+136]
iteration =94  weights = [ -1.05654012e+136   1.39121962e+138]
iteration =95  weights = [ -3.10553234e+137   4.08926970e+139]
iteration =96  weights = [ -9.12822039e+138   1.20197605e+141]
iteration =97  weights = [ -2.68309579e+140   3.53301822e+142]
iteration =98  weights = [ -7.88653509e+141   1.03847474e+144]
iteration =99  weights = [ -2.31812207e+143   3.05243201e+145]
iteration =100  weights = [ -6.81375262e+144   8.97214040e+146]
Final weights

[ -2.00279465e+146   2.63721855e+148]

In [ ]:


In [ ]:

Kmeans


In [ ]: